home *** CD-ROM | disk | FTP | other *** search
/ Clickx 96 / Clickx 96.iso / software / tools / tool / xbmc-10.1.exe / addons / script.module.pysqlite / lib / pysqlite2 / test / dbapi.py < prev    next >
Encoding:
Python Source  |  2009-10-19  |  27.3 KB  |  813 lines

  1. #-*- coding: ISO-8859-1 -*-
  2. # pysqlite2/test/dbapi.py: tests for DB-API compliance
  3. #
  4. # Copyright (C) 2004-2009 Gerhard HΣring <gh@ghaering.de>
  5. #
  6. # This file is part of pysqlite.
  7. #
  8. # This software is provided 'as-is', without any express or implied
  9. # warranty.  In no event will the authors be held liable for any damages
  10. # arising from the use of this software.
  11. #
  12. # Permission is granted to anyone to use this software for any purpose,
  13. # including commercial applications, and to alter it and redistribute it
  14. # freely, subject to the following restrictions:
  15. #
  16. # 1. The origin of this software must not be misrepresented; you must not
  17. #    claim that you wrote the original software. If you use this software
  18. #    in a product, an acknowledgment in the product documentation would be
  19. #    appreciated but is not required.
  20. # 2. Altered source versions must be plainly marked as such, and must not be
  21. #    misrepresented as being the original software.
  22. # 3. This notice may not be removed or altered from any source distribution.
  23.  
  24. import unittest
  25. import sys
  26. import threading
  27. import pysqlite2.dbapi2 as sqlite
  28.  
  29. class ModuleTests(unittest.TestCase):
  30.     def CheckAPILevel(self):
  31.         self.assertEqual(sqlite.apilevel, "2.0",
  32.                          "apilevel is %s, should be 2.0" % sqlite.apilevel)
  33.  
  34.     def CheckThreadSafety(self):
  35.         self.assertEqual(sqlite.threadsafety, 1,
  36.                          "threadsafety is %d, should be 1" % sqlite.threadsafety)
  37.  
  38.     def CheckParamStyle(self):
  39.         self.assertEqual(sqlite.paramstyle, "qmark",
  40.                          "paramstyle is '%s', should be 'qmark'" %
  41.                          sqlite.paramstyle)
  42.  
  43.     def CheckWarning(self):
  44.         self.assert_(issubclass(sqlite.Warning, StandardError),
  45.                      "Warning is not a subclass of StandardError")
  46.  
  47.     def CheckError(self):
  48.         self.failUnless(issubclass(sqlite.Error, StandardError),
  49.                         "Error is not a subclass of StandardError")
  50.  
  51.     def CheckInterfaceError(self):
  52.         self.failUnless(issubclass(sqlite.InterfaceError, sqlite.Error),
  53.                         "InterfaceError is not a subclass of Error")
  54.  
  55.     def CheckDatabaseError(self):
  56.         self.failUnless(issubclass(sqlite.DatabaseError, sqlite.Error),
  57.                         "DatabaseError is not a subclass of Error")
  58.  
  59.     def CheckDataError(self):
  60.         self.failUnless(issubclass(sqlite.DataError, sqlite.DatabaseError),
  61.                         "DataError is not a subclass of DatabaseError")
  62.  
  63.     def CheckOperationalError(self):
  64.         self.failUnless(issubclass(sqlite.OperationalError, sqlite.DatabaseError),
  65.                         "OperationalError is not a subclass of DatabaseError")
  66.  
  67.     def CheckIntegrityError(self):
  68.         self.failUnless(issubclass(sqlite.IntegrityError, sqlite.DatabaseError),
  69.                         "IntegrityError is not a subclass of DatabaseError")
  70.  
  71.     def CheckInternalError(self):
  72.         self.failUnless(issubclass(sqlite.InternalError, sqlite.DatabaseError),
  73.                         "InternalError is not a subclass of DatabaseError")
  74.  
  75.     def CheckProgrammingError(self):
  76.         self.failUnless(issubclass(sqlite.ProgrammingError, sqlite.DatabaseError),
  77.                         "ProgrammingError is not a subclass of DatabaseError")
  78.  
  79.     def CheckNotSupportedError(self):
  80.         self.failUnless(issubclass(sqlite.NotSupportedError,
  81.                                    sqlite.DatabaseError),
  82.                         "NotSupportedError is not a subclass of DatabaseError")
  83.  
  84. class ConnectionTests(unittest.TestCase):
  85.     def setUp(self):
  86.         self.cx = sqlite.connect(":memory:")
  87.         cu = self.cx.cursor()
  88.         cu.execute("create table test(id integer primary key, name text)")
  89.         cu.execute("insert into test(name) values (?)", ("foo",))
  90.  
  91.     def tearDown(self):
  92.         self.cx.close()
  93.  
  94.     def CheckCommit(self):
  95.         self.cx.commit()
  96.  
  97.     def CheckCommitAfterNoChanges(self):
  98.         """
  99.         A commit should also work when no changes were made to the database.
  100.         """
  101.         self.cx.commit()
  102.         self.cx.commit()
  103.  
  104.     def CheckRollback(self):
  105.         self.cx.rollback()
  106.  
  107.     def CheckRollbackAfterNoChanges(self):
  108.         """
  109.         A rollback should also work when no changes were made to the database.
  110.         """
  111.         self.cx.rollback()
  112.         self.cx.rollback()
  113.  
  114.     def CheckCursor(self):
  115.         cu = self.cx.cursor()
  116.  
  117.     def CheckFailedOpen(self):
  118.         YOU_CANNOT_OPEN_THIS = "/foo/bar/bla/23534/mydb.db"
  119.         try:
  120.             con = sqlite.connect(YOU_CANNOT_OPEN_THIS)
  121.         except sqlite.OperationalError:
  122.             return
  123.         self.fail("should have raised an OperationalError")
  124.  
  125.     def CheckClose(self):
  126.         self.cx.close()
  127.  
  128.     def CheckExceptions(self):
  129.         # Optional DB-API extension.
  130.         self.failUnlessEqual(self.cx.Warning, sqlite.Warning)
  131.         self.failUnlessEqual(self.cx.Error, sqlite.Error)
  132.         self.failUnlessEqual(self.cx.InterfaceError, sqlite.InterfaceError)
  133.         self.failUnlessEqual(self.cx.DatabaseError, sqlite.DatabaseError)
  134.         self.failUnlessEqual(self.cx.DataError, sqlite.DataError)
  135.         self.failUnlessEqual(self.cx.OperationalError, sqlite.OperationalError)
  136.         self.failUnlessEqual(self.cx.IntegrityError, sqlite.IntegrityError)
  137.         self.failUnlessEqual(self.cx.InternalError, sqlite.InternalError)
  138.         self.failUnlessEqual(self.cx.ProgrammingError, sqlite.ProgrammingError)
  139.         self.failUnlessEqual(self.cx.NotSupportedError, sqlite.NotSupportedError)
  140.  
  141. class CursorTests(unittest.TestCase):
  142.     def setUp(self):
  143.         self.cx = sqlite.connect(":memory:")
  144.         self.cu = self.cx.cursor()
  145.         self.cu.execute("create table test(id integer primary key, name text, income number)")
  146.         self.cu.execute("insert into test(name) values (?)", ("foo",))
  147.  
  148.     def tearDown(self):
  149.         self.cu.close()
  150.         self.cx.close()
  151.  
  152.     def CheckExecuteNoArgs(self):
  153.         self.cu.execute("delete from test")
  154.  
  155.     def CheckExecuteIllegalSql(self):
  156.         try:
  157.             self.cu.execute("select asdf")
  158.             self.fail("should have raised an OperationalError")
  159.         except sqlite.OperationalError:
  160.             return
  161.         except:
  162.             self.fail("raised wrong exception")
  163.  
  164.     def CheckExecuteTooMuchSql(self):
  165.         try:
  166.             self.cu.execute("select 5+4; select 4+5")
  167.             self.fail("should have raised a Warning")
  168.         except sqlite.Warning:
  169.             return
  170.         except:
  171.             self.fail("raised wrong exception")
  172.  
  173.     def CheckExecuteTooMuchSql2(self):
  174.         self.cu.execute("select 5+4; -- foo bar")
  175.  
  176.     def CheckExecuteTooMuchSql3(self):
  177.         self.cu.execute("""
  178.             select 5+4;
  179.  
  180.             /*
  181.             foo
  182.             */
  183.             """)
  184.  
  185.     def CheckExecuteWrongSqlArg(self):
  186.         try:
  187.             self.cu.execute(42)
  188.             self.fail("should have raised a ValueError")
  189.         except ValueError:
  190.             return
  191.         except:
  192.             self.fail("raised wrong exception.")
  193.  
  194.     def CheckExecuteArgInt(self):
  195.         self.cu.execute("insert into test(id) values (?)", (42,))
  196.  
  197.     def CheckExecuteArgFloat(self):
  198.         self.cu.execute("insert into test(income) values (?)", (2500.32,))
  199.  
  200.     def CheckExecuteArgString(self):
  201.         self.cu.execute("insert into test(name) values (?)", ("Hugo",))
  202.  
  203.     def CheckExecuteWrongNoOfArgs1(self):
  204.         # too many parameters
  205.         try:
  206.             self.cu.execute("insert into test(id) values (?)", (17, "Egon"))
  207.             self.fail("should have raised ProgrammingError")
  208.         except sqlite.ProgrammingError:
  209.             pass
  210.  
  211.     def CheckExecuteWrongNoOfArgs2(self):
  212.         # too little parameters
  213.         try:
  214.             self.cu.execute("insert into test(id) values (?)")
  215.             self.fail("should have raised ProgrammingError")
  216.         except sqlite.ProgrammingError:
  217.             pass
  218.  
  219.     def CheckExecuteWrongNoOfArgs3(self):
  220.         # no parameters, parameters are needed
  221.         try:
  222.             self.cu.execute("insert into test(id) values (?)")
  223.             self.fail("should have raised ProgrammingError")
  224.         except sqlite.ProgrammingError:
  225.             pass
  226.  
  227.     def CheckExecuteParamList(self):
  228.         self.cu.execute("insert into test(name) values ('foo')")
  229.         self.cu.execute("select name from test where name=?", ["foo"])
  230.         row = self.cu.fetchone()
  231.         self.failUnlessEqual(row[0], "foo")
  232.  
  233.     def CheckExecuteParamSequence(self):
  234.         class L(object):
  235.             def __len__(self):
  236.                 return 1
  237.             def __getitem__(self, x):
  238.                 assert x == 0
  239.                 return "foo"
  240.  
  241.         self.cu.execute("insert into test(name) values ('foo')")
  242.         self.cu.execute("select name from test where name=?", L())
  243.         row = self.cu.fetchone()
  244.         self.failUnlessEqual(row[0], "foo")
  245.  
  246.     def CheckExecuteDictMapping(self):
  247.         self.cu.execute("insert into test(name) values ('foo')")
  248.         self.cu.execute("select name from test where name=:name", {"name": "foo"})
  249.         row = self.cu.fetchone()
  250.         self.failUnlessEqual(row[0], "foo")
  251.  
  252.     def CheckExecuteDictMapping_Mapping(self):
  253.         # Test only works with Python 2.5 or later
  254.         if sys.version_info < (2, 5, 0):
  255.             return
  256.  
  257.         class D(dict):
  258.             def __missing__(self, key):
  259.                 return "foo"
  260.  
  261.         self.cu.execute("insert into test(name) values ('foo')")
  262.         self.cu.execute("select name from test where name=:name", D())
  263.         row = self.cu.fetchone()
  264.         self.failUnlessEqual(row[0], "foo")
  265.  
  266.     def CheckExecuteDictMappingTooLittleArgs(self):
  267.         self.cu.execute("insert into test(name) values ('foo')")
  268.         try:
  269.             self.cu.execute("select name from test where name=:name and id=:id", {"name": "foo"})
  270.             self.fail("should have raised ProgrammingError")
  271.         except sqlite.ProgrammingError:
  272.             pass
  273.  
  274.     def CheckExecuteDictMappingNoArgs(self):
  275.         self.cu.execute("insert into test(name) values ('foo')")
  276.         try:
  277.             self.cu.execute("select name from test where name=:name")
  278.             self.fail("should have raised ProgrammingError")
  279.         except sqlite.ProgrammingError:
  280.             pass
  281.  
  282.     def CheckExecuteDictMappingUnnamed(self):
  283.         self.cu.execute("insert into test(name) values ('foo')")
  284.         try:
  285.             self.cu.execute("select name from test where name=?", {"name": "foo"})
  286.             self.fail("should have raised ProgrammingError")
  287.         except sqlite.ProgrammingError:
  288.             pass
  289.  
  290.     def CheckClose(self):
  291.         self.cu.close()
  292.  
  293.     def CheckRowcountExecute(self):
  294.         self.cu.execute("delete from test")
  295.         self.cu.execute("insert into test(name) values ('foo')")
  296.         self.cu.execute("insert into test(name) values ('foo')")
  297.         self.cu.execute("update test set name='bar'")
  298.         self.failUnlessEqual(self.cu.rowcount, 2)
  299.  
  300.     def CheckRowcountSelect(self):
  301.         """
  302.         pysqlite does not know the rowcount of SELECT statements, because we
  303.         don't fetch all rows after executing the select statement. The rowcount
  304.         has thus to be -1.
  305.         """
  306.         self.cu.execute("select 5 union select 6")
  307.         self.failUnlessEqual(self.cu.rowcount, -1)
  308.  
  309.     def CheckRowcountExecutemany(self):
  310.         self.cu.execute("delete from test")
  311.         self.cu.executemany("insert into test(name) values (?)", [(1,), (2,), (3,)])
  312.         self.failUnlessEqual(self.cu.rowcount, 3)
  313.  
  314.     def CheckTotalChanges(self):
  315.         self.cu.execute("insert into test(name) values ('foo')")
  316.         self.cu.execute("insert into test(name) values ('foo')")
  317.         if self.cx.total_changes < 2:
  318.             self.fail("total changes reported wrong value")
  319.  
  320.     # Checks for executemany:
  321.     # Sequences are required by the DB-API, iterators
  322.     # enhancements in pysqlite.
  323.  
  324.     def CheckExecuteManySequence(self):
  325.         self.cu.executemany("insert into test(income) values (?)", [(x,) for x in range(100, 110)])
  326.  
  327.     def CheckExecuteManyIterator(self):
  328.         class MyIter:
  329.             def __init__(self):
  330.                 self.value = 5
  331.  
  332.             def next(self):
  333.                 if self.value == 10:
  334.                     raise StopIteration
  335.                 else:
  336.                     self.value += 1
  337.                     return (self.value,)
  338.  
  339.         self.cu.executemany("insert into test(income) values (?)", MyIter())
  340.  
  341.     def CheckExecuteManyGenerator(self):
  342.         def mygen():
  343.             for i in range(5):
  344.                 yield (i,)
  345.  
  346.         self.cu.executemany("insert into test(income) values (?)", mygen())
  347.  
  348.     def CheckExecuteManyWrongSqlArg(self):
  349.         try:
  350.             self.cu.executemany(42, [(3,)])
  351.             self.fail("should have raised a ValueError")
  352.         except ValueError:
  353.             return
  354.         except:
  355.             self.fail("raised wrong exception.")
  356.  
  357.     def CheckExecuteManySelect(self):
  358.         try:
  359.             self.cu.executemany("select ?", [(3,)])
  360.             self.fail("should have raised a ProgrammingError")
  361.         except sqlite.ProgrammingError:
  362.             return
  363.         except:
  364.             self.fail("raised wrong exception.")
  365.  
  366.     def CheckExecuteManyNotIterable(self):
  367.         try:
  368.             self.cu.executemany("insert into test(income) values (?)", 42)
  369.             self.fail("should have raised a TypeError")
  370.         except TypeError:
  371.             return
  372.         except Exception, e:
  373.             print "raised", e.__class__
  374.             self.fail("raised wrong exception.")
  375.  
  376.     def CheckFetchIter(self):
  377.         # Optional DB-API extension.
  378.         self.cu.execute("delete from test")
  379.         self.cu.execute("insert into test(id) values (?)", (5,))
  380.         self.cu.execute("insert into test(id) values (?)", (6,))
  381.         self.cu.execute("select id from test order by id")
  382.         lst = []
  383.         for row in self.cu:
  384.             lst.append(row[0])
  385.         self.failUnlessEqual(lst[0], 5)
  386.         self.failUnlessEqual(lst[1], 6)
  387.  
  388.     def CheckFetchone(self):
  389.         self.cu.execute("select name from test")
  390.         row = self.cu.fetchone()
  391.         self.failUnlessEqual(row[0], "foo")
  392.         row = self.cu.fetchone()
  393.         self.failUnlessEqual(row, None)
  394.  
  395.     def CheckFetchoneNoStatement(self):
  396.         cur = self.cx.cursor()
  397.         row = cur.fetchone()
  398.         self.failUnlessEqual(row, None)
  399.  
  400.     def CheckArraySize(self):
  401.         # must default ot 1
  402.         self.failUnlessEqual(self.cu.arraysize, 1)
  403.  
  404.         # now set to 2
  405.         self.cu.arraysize = 2
  406.  
  407.         # now make the query return 3 rows
  408.         self.cu.execute("delete from test")
  409.         self.cu.execute("insert into test(name) values ('A')")
  410.         self.cu.execute("insert into test(name) values ('B')")
  411.         self.cu.execute("insert into test(name) values ('C')")
  412.         self.cu.execute("select name from test")
  413.         res = self.cu.fetchmany()
  414.  
  415.         self.failUnlessEqual(len(res), 2)
  416.  
  417.     def CheckFetchmany(self):
  418.         self.cu.execute("select name from test")
  419.         res = self.cu.fetchmany(100)
  420.         self.failUnlessEqual(len(res), 1)
  421.         res = self.cu.fetchmany(100)
  422.         self.failUnlessEqual(res, [])
  423.  
  424.     def CheckFetchmanyKwArg(self):
  425.         """Checks if fetchmany works with keyword arguments"""
  426.         self.cu.execute("select name from test")
  427.         res = self.cu.fetchmany(size=100)
  428.         self.failUnlessEqual(len(res), 1)
  429.  
  430.     def CheckFetchall(self):
  431.         self.cu.execute("select name from test")
  432.         res = self.cu.fetchall()
  433.         self.failUnlessEqual(len(res), 1)
  434.         res = self.cu.fetchall()
  435.         self.failUnlessEqual(res, [])
  436.  
  437.     def CheckSetinputsizes(self):
  438.         self.cu.setinputsizes([3, 4, 5])
  439.  
  440.     def CheckSetoutputsize(self):
  441.         self.cu.setoutputsize(5, 0)
  442.  
  443.     def CheckSetoutputsizeNoColumn(self):
  444.         self.cu.setoutputsize(42)
  445.  
  446.     def CheckCursorConnection(self):
  447.         # Optional DB-API extension.
  448.         self.failUnlessEqual(self.cu.connection, self.cx)
  449.  
  450.     def CheckWrongCursorCallable(self):
  451.         try:
  452.             def f(): pass
  453.             cur = self.cx.cursor(f)
  454.             self.fail("should have raised a TypeError")
  455.         except TypeError:
  456.             return
  457.         self.fail("should have raised a ValueError")
  458.  
  459.     def CheckCursorWrongClass(self):
  460.         class Foo: pass
  461.         foo = Foo()
  462.         try:
  463.             cur = sqlite.Cursor(foo)
  464.             self.fail("should have raised a ValueError")
  465.         except TypeError:
  466.             pass
  467.  
  468. class ThreadTests(unittest.TestCase):
  469.     def setUp(self):
  470.         self.con = sqlite.connect(":memory:")
  471.         self.cur = self.con.cursor()
  472.         self.cur.execute("create table test(id integer primary key, name text, bin binary, ratio number, ts timestamp)")
  473.  
  474.     def tearDown(self):
  475.         self.cur.close()
  476.         self.con.close()
  477.  
  478.     def CheckConCursor(self):
  479.         def run(con, errors):
  480.             try:
  481.                 cur = con.cursor()
  482.                 errors.append("did not raise ProgrammingError")
  483.                 return
  484.             except sqlite.ProgrammingError:
  485.                 return
  486.             except:
  487.                 errors.append("raised wrong exception")
  488.  
  489.         errors = []
  490.         t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
  491.         t.start()
  492.         t.join()
  493.         if len(errors) > 0:
  494.             self.fail("\n".join(errors))
  495.  
  496.     def CheckConCommit(self):
  497.         def run(con, errors):
  498.             try:
  499.                 con.commit()
  500.                 errors.append("did not raise ProgrammingError")
  501.                 return
  502.             except sqlite.ProgrammingError:
  503.                 return
  504.             except:
  505.                 errors.append("raised wrong exception")
  506.  
  507.         errors = []
  508.         t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
  509.         t.start()
  510.         t.join()
  511.         if len(errors) > 0:
  512.             self.fail("\n".join(errors))
  513.  
  514.     def CheckConRollback(self):
  515.         def run(con, errors):
  516.             try:
  517.                 con.rollback()
  518.                 errors.append("did not raise ProgrammingError")
  519.                 return
  520.             except sqlite.ProgrammingError:
  521.                 return
  522.             except:
  523.                 errors.append("raised wrong exception")
  524.  
  525.         errors = []
  526.         t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
  527.         t.start()
  528.         t.join()
  529.         if len(errors) > 0:
  530.             self.fail("\n".join(errors))
  531.  
  532.     def CheckConClose(self):
  533.         def run(con, errors):
  534.             try:
  535.                 con.close()
  536.                 errors.append("did not raise ProgrammingError")
  537.                 return
  538.             except sqlite.ProgrammingError:
  539.                 return
  540.             except:
  541.                 errors.append("raised wrong exception")
  542.  
  543.         errors = []
  544.         t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
  545.         t.start()
  546.         t.join()
  547.         if len(errors) > 0:
  548.             self.fail("\n".join(errors))
  549.  
  550.     def CheckCurImplicitBegin(self):
  551.         def run(cur, errors):
  552.             try:
  553.                 cur.execute("insert into test(name) values ('a')")
  554.                 errors.append("did not raise ProgrammingError")
  555.                 return
  556.             except sqlite.ProgrammingError:
  557.                 return
  558.             except:
  559.                 errors.append("raised wrong exception")
  560.  
  561.         errors = []
  562.         t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
  563.         t.start()
  564.         t.join()
  565.         if len(errors) > 0:
  566.             self.fail("\n".join(errors))
  567.  
  568.     def CheckCurClose(self):
  569.         def run(cur, errors):
  570.             try:
  571.                 cur.close()
  572.                 errors.append("did not raise ProgrammingError")
  573.                 return
  574.             except sqlite.ProgrammingError:
  575.                 return
  576.             except:
  577.                 errors.append("raised wrong exception")
  578.  
  579.         errors = []
  580.         t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
  581.         t.start()
  582.         t.join()
  583.         if len(errors) > 0:
  584.             self.fail("\n".join(errors))
  585.  
  586.     def CheckCurExecute(self):
  587.         def run(cur, errors):
  588.             try:
  589.                 cur.execute("select name from test")
  590.                 errors.append("did not raise ProgrammingError")
  591.                 return
  592.             except sqlite.ProgrammingError:
  593.                 return
  594.             except:
  595.                 errors.append("raised wrong exception")
  596.  
  597.         errors = []
  598.         self.cur.execute("insert into test(name) values ('a')")
  599.         t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
  600.         t.start()
  601.         t.join()
  602.         if len(errors) > 0:
  603.             self.fail("\n".join(errors))
  604.  
  605.     def CheckCurIterNext(self):
  606.         def run(cur, errors):
  607.             try:
  608.                 row = cur.fetchone()
  609.                 errors.append("did not raise ProgrammingError")
  610.                 return
  611.             except sqlite.ProgrammingError:
  612.                 return
  613.             except:
  614.                 errors.append("raised wrong exception")
  615.  
  616.         errors = []
  617.         self.cur.execute("insert into test(name) values ('a')")
  618.         self.cur.execute("select name from test")
  619.         t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
  620.         t.start()
  621.         t.join()
  622.         if len(errors) > 0:
  623.             self.fail("\n".join(errors))
  624.  
  625. class ConstructorTests(unittest.TestCase):
  626.     def CheckDate(self):
  627.         d = sqlite.Date(2004, 10, 28)
  628.  
  629.     def CheckTime(self):
  630.         t = sqlite.Time(12, 39, 35)
  631.  
  632.     def CheckTimestamp(self):
  633.         ts = sqlite.Timestamp(2004, 10, 28, 12, 39, 35)
  634.  
  635.     def CheckDateFromTicks(self):
  636.         d = sqlite.DateFromTicks(42)
  637.  
  638.     def CheckTimeFromTicks(self):
  639.         t = sqlite.TimeFromTicks(42)
  640.  
  641.     def CheckTimestampFromTicks(self):
  642.         ts = sqlite.TimestampFromTicks(42)
  643.  
  644.     def CheckBinary(self):
  645.         b = sqlite.Binary(chr(0) + "'")
  646.  
  647. class ExtensionTests(unittest.TestCase):
  648.     def CheckScriptStringSql(self):
  649.         con = sqlite.connect(":memory:")
  650.         cur = con.cursor()
  651.         cur.executescript("""
  652.             -- bla bla
  653.             /* a stupid comment */
  654.             create table a(i);
  655.             insert into a(i) values (5);
  656.             """)
  657.         cur.execute("select i from a")
  658.         res = cur.fetchone()[0]
  659.         self.failUnlessEqual(res, 5)
  660.  
  661.     def CheckScriptStringUnicode(self):
  662.         con = sqlite.connect(":memory:")
  663.         cur = con.cursor()
  664.         cur.executescript(u"""
  665.             create table a(i);
  666.             insert into a(i) values (5);
  667.             select i from a;
  668.             delete from a;
  669.             insert into a(i) values (6);
  670.             """)
  671.         cur.execute("select i from a")
  672.         res = cur.fetchone()[0]
  673.         self.failUnlessEqual(res, 6)
  674.  
  675.     def CheckScriptSyntaxError(self):
  676.         con = sqlite.connect(":memory:")
  677.         cur = con.cursor()
  678.         raised = False
  679.         try:
  680.             cur.executescript("create table test(x); asdf; create table test2(x)")
  681.         except sqlite.OperationalError:
  682.             raised = True
  683.         self.failUnlessEqual(raised, True, "should have raised an exception")
  684.  
  685.     def CheckScriptErrorNormal(self):
  686.         con = sqlite.connect(":memory:")
  687.         cur = con.cursor()
  688.         raised = False
  689.         try:
  690.             cur.executescript("create table test(sadfsadfdsa); select foo from hurz;")
  691.         except sqlite.OperationalError:
  692.             raised = True
  693.         self.failUnlessEqual(raised, True, "should have raised an exception")
  694.  
  695.     def CheckConnectionExecute(self):
  696.         con = sqlite.connect(":memory:")
  697.         result = con.execute("select 5").fetchone()[0]
  698.         self.failUnlessEqual(result, 5, "Basic test of Connection.execute")
  699.  
  700.     def CheckConnectionExecutemany(self):
  701.         con = sqlite.connect(":memory:")
  702.         con.execute("create table test(foo)")
  703.         con.executemany("insert into test(foo) values (?)", [(3,), (4,)])
  704.         result = con.execute("select foo from test order by foo").fetchall()
  705.         self.failUnlessEqual(result[0][0], 3, "Basic test of Connection.executemany")
  706.         self.failUnlessEqual(result[1][0], 4, "Basic test of Connection.executemany")
  707.  
  708.     def CheckConnectionExecutescript(self):
  709.         con = sqlite.connect(":memory:")
  710.         con.executescript("create table test(foo); insert into test(foo) values (5);")
  711.         result = con.execute("select foo from test").fetchone()[0]
  712.         self.failUnlessEqual(result, 5, "Basic test of Connection.executescript")
  713.  
  714. class ClosedConTests(unittest.TestCase):
  715.     def setUp(self):
  716.         pass
  717.  
  718.     def tearDown(self):
  719.         pass
  720.  
  721.     def CheckClosedConCursor(self):
  722.         con = sqlite.connect(":memory:")
  723.         con.close()
  724.         try:
  725.             cur = con.cursor()
  726.             self.fail("Should have raised a ProgrammingError")
  727.         except sqlite.ProgrammingError:
  728.             pass
  729.         except:
  730.             self.fail("Should have raised a ProgrammingError")
  731.  
  732.     def CheckClosedConCommit(self):
  733.         con = sqlite.connect(":memory:")
  734.         con.close()
  735.         try:
  736.             con.commit()
  737.             self.fail("Should have raised a ProgrammingError")
  738.         except sqlite.ProgrammingError:
  739.             pass
  740.         except:
  741.             self.fail("Should have raised a ProgrammingError")
  742.  
  743.     def CheckClosedConRollback(self):
  744.         con = sqlite.connect(":memory:")
  745.         con.close()
  746.         try:
  747.             con.rollback()
  748.             self.fail("Should have raised a ProgrammingError")
  749.         except sqlite.ProgrammingError:
  750.             pass
  751.         except:
  752.             self.fail("Should have raised a ProgrammingError")
  753.  
  754.     def CheckClosedCurExecute(self):
  755.         con = sqlite.connect(":memory:")
  756.         cur = con.cursor()
  757.         con.close()
  758.         try:
  759.             cur.execute("select 4")
  760.             self.fail("Should have raised a ProgrammingError")
  761.         except sqlite.ProgrammingError:
  762.             pass
  763.         except:
  764.             self.fail("Should have raised a ProgrammingError")
  765.  
  766. class ClosedCurTests(unittest.TestCase):
  767.     def setUp(self):
  768.         pass
  769.  
  770.     def tearDown(self):
  771.         pass
  772.  
  773.     def CheckClosed(self):
  774.         con = sqlite.connect(":memory:")
  775.         cur = con.cursor()
  776.         cur.close()
  777.  
  778.         for method_name in ("execute", "executemany", "executescript", "fetchall", "fetchmany", "fetchone"):
  779.             if method_name in ("execute", "executescript"):
  780.                 params = ("select 4 union select 5",)
  781.             elif method_name == "executemany":
  782.                 params = ("insert into foo(bar) values (?)", [(3,), (4,)])
  783.             else:
  784.                 params = []
  785.  
  786.             try:
  787.                 method = getattr(cur, method_name)
  788.  
  789.                 method(*params)
  790.                 self.fail("Should have raised a ProgrammingError: method " + method_name)
  791.             except sqlite.ProgrammingError:
  792.                 pass
  793.             except:
  794.                 self.fail("Should have raised a ProgrammingError: " + method_name)
  795.  
  796. def suite():
  797.     module_suite = unittest.makeSuite(ModuleTests, "Check")
  798.     connection_suite = unittest.makeSuite(ConnectionTests, "Check")
  799.     cursor_suite = unittest.makeSuite(CursorTests, "Check")
  800.     thread_suite = unittest.makeSuite(ThreadTests, "Check")
  801.     constructor_suite = unittest.makeSuite(ConstructorTests, "Check")
  802.     ext_suite = unittest.makeSuite(ExtensionTests, "Check")
  803.     closed_con_suite = unittest.makeSuite(ClosedConTests, "Check")
  804.     closed_cur_suite = unittest.makeSuite(ClosedCurTests, "Check")
  805.     return unittest.TestSuite((module_suite, connection_suite, cursor_suite, thread_suite, constructor_suite, ext_suite, closed_con_suite, closed_cur_suite))
  806.  
  807. def test():
  808.     runner = unittest.TextTestRunner()
  809.     runner.run(suite())
  810.  
  811. if __name__ == "__main__":
  812.     test()
  813.